package com.zmh.wt.healthmonitor.mqtt;

import cn.hutool.core.util.CharsetUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.zmh.wt.healthmonitor.config.MqttConfiguration;
import com.zmh.wt.healthmonitor.entity.MqttMssg;

import com.zmh.wt.healthmonitor.entity.RealtimeDO;
import com.zmh.wt.healthmonitor.service.RealTimeService;
import com.zmh.wt.healthmonitor.service.impl.RealtimeServiceImpl;
import com.zmh.wt.healthmonitor.util.SpringUtils;
import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.sql.Timestamp;
import java.util.Date;

@Component
public class MyMqttCallback implements MqttCallbackExtended {


    Logger log = LoggerFactory.getLogger(MyMqttCallback.class);
    private MQTTClient mqttClient;

    MqttConfiguration mqttConfiguration = SpringUtils.getBean(MqttConfiguration.class);

    MqttMssg mqttMssg = new MqttMssg();

    public static MyMqttCallback mqttCallback;

    public MyMqttCallback(MQTTClient mqttClient) {
        this.mqttClient = mqttClient;
    }


    @Override
    public void connectionLost(Throwable throwable) {
        log.error("mqtt connectionLost 连接断开，5S之后尝试重连: {}", throwable.getMessage());
        long reconnectTimes = 1;
        while (true) {
            try {
                if (MQTTClient.getClient().isConnected()) {
                    //判断已经重新连接成功 需要重新订阅主题 可以在这个if里面订阅主题  或者 connectComplete（方法里面）  看你们自己选择
                    log.warn("mqtt reconnect success end  重新连接  重新订阅成功");
                    return;
                }
                reconnectTimes += 1;
                log.warn("mqtt reconnect times = {} try again...  mqtt重新连接时间 {}", reconnectTimes, reconnectTimes);
                MQTTClient.getClient().reconnect();
            } catch (MqttException e) {
                log.error("mqtt断连异常", e);
            }
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e1) {
            }
        }
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        log.info("接收消息主题 : {}，接收消息内容 : {}", topic, new String(message.getPayload()));
        //发布消息主题
        if (topic.equals("publish")) {
            insertCmdResults(topic, message);
        }
    }

    private void insertCmdResults(String topic, MqttMessage message) throws Exception {
        String strMessage = new String(message.getPayload(), CharsetUtil.UTF_8);
        String[] strs = strMessage.split(";");
        String deviceId = strs[0];
        int deviceCycleCount = Integer.parseInt(strs[1]);

        //device_state：0正常 1故障 2已连接 9未连接
        int deviceState = Integer.parseInt(strs[2]);
        int deviceFaultCount = Integer.parseInt(strs[3]);
        double deciceCycle = Double.parseDouble(strs[4]);
        double deviceFeature1 = Double.parseDouble(strs[5]);
        double deviceFeature2 = Double.parseDouble(strs[6]);
        double deviceFeature3 = Double.parseDouble(strs[7]);
        Date dt = null;
        dt = new Date();
        mqttMssg = MqttMssg.builder().id(deviceId).cyclecount(deviceCycleCount).state(deviceState).
                faultcount(deviceFaultCount).cycle(deciceCycle).feature1(deviceFeature1).feature2(deviceFeature2).
                feature3(deviceFeature3).sendtime(new Timestamp(dt.getTime())).build();


        //处理数据入库与更新
        RealtimeDO realtimeDO = RealtimeDO.builder().
                windfarm("10001").
                windturbine(Integer.parseInt(deviceId)).
                status(deviceState).
                feature1(deviceFeature1*13/7).
                feature2(deviceFeature2).
                feature3(deviceFeature3).
                gmtReceived(new Timestamp(dt.getTime())).build();

        ApplicationContext context = SpringUtils.getApplicationContext();
        RealTimeService realTimeService = context.getBean(RealTimeService.class);
        realTimeService.insertRealtimeData(realtimeDO);

        log.info("数据入库:{}", mqttMssg.toString());

    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        log.info("==========deliveryComplete={}==========", token.isComplete());
    }

    @Override
    public void connectComplete(boolean b, String s) {

    }
}
